ComposableTimeSeriesForestRegressor (sktime-style TSF)#
The Time-Series Forest (TSF) is an interval-based ensemble: each tree trains on summary features computed over random intervals of the input series.
This notebook implements a ComposableTimeSeriesForestRegressor([…]) inspired by sktime’s TimeSeriesForestRegressor family:
You choose the summary functions (e.g., mean/std/slope) via a list.
Each tree samples random intervals, extracts features, and fits a
DecisionTreeRegressor.Predictions are the average across trees.
We’ll also show how to use it for one-step-ahead forecasting via a sliding-window reduction.
Goals#
Explain TSF intuition and why interval features work
Implement
ComposableTimeSeriesForestRegressor([...])in NumPy + scikit-learnDemonstrate multi-seasonal forecasting via a windowed regression dataset
Plot forecast behavior and residual diagnostics with Plotly
Algorithm sketch#
Given training pairs \((X_i, y_i)\) where each \(X_i\) is a univariate (or multivariate) time series of fixed length \(m\).
For each tree \(b=1,\dots,B\):
Sample \(K\) random intervals \([s_k, e_k)\) with \(0\le s_k < e_k \le m\).
For each interval, compute a set of summary features \(f_1,\dots,f_F\) (e.g., mean/std/slope) on that segment.
Concatenate all interval features into a feature vector \(\phi_b(X_i)\).
Fit a decision tree regressor \(T_b\) on \((\phi_b(X_i), y_i)\).
Prediction is the ensemble average: $\(\hat{y}(X) = \frac{1}{B}\sum_{b=1}^{B} T_b(\phi_b(X)).\)$
Why it works (intuition): random intervals capture local patterns (level shifts, spikes, seasonal fragments, trend segments). Trees then learn which interval statistics matter.
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import plotly.io as pio
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from scipy import stats
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
pio.templates.default = "plotly_white"
rng = np.random.default_rng(7)
import numpy, pandas, sklearn, scipy, plotly
print("numpy:", numpy.__version__)
print("pandas:", pandas.__version__)
print("sklearn:", sklearn.__version__)
print("scipy:", scipy.__version__)
print("plotly:", plotly.__version__)
numpy: 1.26.2
pandas: 2.1.3
sklearn: 1.6.0
scipy: 1.15.0
plotly: 6.5.2
def _as_3d_panel(X: np.ndarray) -> np.ndarray:
"""Accept (n, m) or (n, d, m). Return (n, d, m)."""
X = np.asarray(X, dtype=float)
if X.ndim == 2:
return X[:, None, :]
if X.ndim == 3:
return X
raise ValueError(f"X must be 2D or 3D, got shape={X.shape}")
def _acf(x: np.ndarray, max_lag: int) -> tuple[np.ndarray, np.ndarray]:
x = np.asarray(x, dtype=float)
x = x - x.mean()
denom = float(np.dot(x, x))
lags = np.arange(max_lag + 1)
values = np.zeros(max_lag + 1)
values[0] = 1.0
if denom == 0.0:
return lags, values
for k in range(1, max_lag + 1):
values[k] = float(np.dot(x[k:], x[:-k]) / denom)
return lags, values
def _feature_mean(seg2d: np.ndarray) -> np.ndarray:
return np.mean(seg2d, axis=1)
def _feature_std(seg2d: np.ndarray) -> np.ndarray:
return np.std(seg2d, axis=1, ddof=0)
def _feature_min(seg2d: np.ndarray) -> np.ndarray:
return np.min(seg2d, axis=1)
def _feature_max(seg2d: np.ndarray) -> np.ndarray:
return np.max(seg2d, axis=1)
def _feature_slope(seg2d: np.ndarray) -> np.ndarray:
"""Least-squares slope vs time index within the interval."""
n = seg2d.shape[1]
if n < 2:
return np.zeros(seg2d.shape[0])
t = np.arange(n, dtype=float)
t = t - t.mean()
denom = float(np.dot(t, t))
y = seg2d - seg2d.mean(axis=1, keepdims=True)
return (y @ t) / denom
_BUILTIN_FEATURES: dict[str, callable] = {
"mean": _feature_mean,
"std": _feature_std,
"slope": _feature_slope,
"min": _feature_min,
"max": _feature_max,
}
def _resolve_feature_functions(feature_functions) -> list[tuple[str, callable]]:
"""Return list of (name, f(seg2d)->(n_samples,))."""
if feature_functions is None:
feature_functions = ["mean", "std", "slope"]
resolved: list[tuple[str, callable]] = []
for i, ff in enumerate(feature_functions):
if isinstance(ff, str):
if ff not in _BUILTIN_FEATURES:
raise ValueError(f"Unknown feature '{ff}'. Available: {sorted(_BUILTIN_FEATURES)}")
resolved.append((ff, _BUILTIN_FEATURES[ff]))
continue
if isinstance(ff, tuple) and len(ff) == 2 and isinstance(ff[0], str) and callable(ff[1]):
name, func = ff
resolved.append((name, func))
continue
if callable(ff):
name = getattr(ff, "__name__", f"feature_{i}")
def _wrapped(seg2d: np.ndarray, _f=ff) -> np.ndarray:
return np.apply_along_axis(_f, 1, seg2d).astype(float)
resolved.append((name, _wrapped))
continue
raise TypeError("feature_functions must contain str, callable, or (name, callable)")
# make names unique
seen: dict[str, int] = {}
unique: list[tuple[str, callable]] = []
for name, func in resolved:
if name not in seen:
seen[name] = 1
unique.append((name, func))
else:
seen[name] += 1
unique.append((f"{name}_{seen[name]}", func))
return unique
def _random_intervals(
n_timepoints: int,
n_intervals: int,
*,
min_length: int,
max_length: int | None,
rng: np.random.Generator,
) -> list[tuple[int, int]]:
n_timepoints = int(n_timepoints)
min_length = int(min_length)
if min_length < 2:
min_length = 2
if max_length is None:
max_length = n_timepoints
max_length = int(min(max_length, n_timepoints))
if min_length > max_length:
raise ValueError("min_length cannot exceed max_length")
intervals: list[tuple[int, int]] = []
for _ in range(int(n_intervals)):
length = int(rng.integers(min_length, max_length + 1))
start = int(rng.integers(0, n_timepoints - length + 1))
end = start + length
intervals.append((start, end))
return intervals
def _transform_intervals(
X3: np.ndarray,
intervals: list[tuple[int, int]],
feature_functions: list[tuple[str, callable]],
) -> np.ndarray:
n, d, _m = X3.shape
features: list[np.ndarray] = []
for dim in range(d):
Xd = X3[:, dim, :]
for (start, end) in intervals:
seg = Xd[:, start:end]
for _name, func in feature_functions:
features.append(func(seg).reshape(n, 1))
if not features:
return np.zeros((n, 0), dtype=float)
return np.concatenate(features, axis=1)
class ComposableTimeSeriesForestRegressor:
"""sktime-style TSF regressor (interval features + tree ensemble).
Parameters
----------
feature_functions : list
List of features to compute per interval.
Each entry can be:
- str in {"mean","std","slope","min","max"}
- callable that maps 1D -> scalar
- (name, callable) where callable maps seg2d -> vector (n_samples,)
n_estimators : int
Number of trees.
n_intervals : int | "sqrt"
Number of random intervals per tree. "sqrt" uses int(sqrt(m)).
min_interval_length : int
Minimum interval length.
max_interval_length : int | None
Maximum interval length (None -> full length).
bootstrap : bool
Bootstrap samples per tree.
random_state : int | None
Seed.
tree_params : dict | None
Passed to sklearn DecisionTreeRegressor.
"""
def __init__(
self,
feature_functions=None,
*,
n_estimators: int = 200,
n_intervals: int | str = "sqrt",
min_interval_length: int = 3,
max_interval_length: int | None = None,
bootstrap: bool = True,
random_state: int | None = None,
tree_params: dict | None = None,
):
self.feature_functions = feature_functions
self.n_estimators = int(n_estimators)
self.n_intervals = n_intervals
self.min_interval_length = int(min_interval_length)
self.max_interval_length = max_interval_length
self.bootstrap = bool(bootstrap)
self.random_state = random_state
self.tree_params = {} if tree_params is None else dict(tree_params)
self.feature_functions_: list[tuple[str, callable]] | None = None
self.intervals_: list[list[tuple[int, int]]] | None = None
self.estimators_: list[DecisionTreeRegressor] | None = None
self.n_timepoints_: int | None = None
self.n_dims_: int | None = None
def fit(self, X, y):
X3 = _as_3d_panel(X)
y = np.asarray(y, dtype=float)
if X3.shape[0] != y.shape[0]:
raise ValueError("X and y must have the same number of samples")
n, d, m = X3.shape
self.n_timepoints_ = int(m)
self.n_dims_ = int(d)
self.feature_functions_ = _resolve_feature_functions(self.feature_functions)
if isinstance(self.n_intervals, str):
if self.n_intervals != "sqrt":
raise ValueError("n_intervals must be int or 'sqrt'")
n_intervals = max(1, int(np.sqrt(m)))
else:
n_intervals = max(1, int(self.n_intervals))
rng = np.random.default_rng(self.random_state)
self.estimators_ = []
self.intervals_ = []
for _ in range(self.n_estimators):
seed = int(rng.integers(0, 2**32 - 1))
r = np.random.default_rng(seed)
intervals = _random_intervals(
n_timepoints=m,
n_intervals=n_intervals,
min_length=self.min_interval_length,
max_length=self.max_interval_length,
rng=r,
)
Phi = _transform_intervals(X3, intervals, self.feature_functions_)
if self.bootstrap:
idx = r.integers(0, n, size=n)
Phi_fit = Phi[idx]
y_fit = y[idx]
else:
Phi_fit = Phi
y_fit = y
tree = DecisionTreeRegressor(random_state=seed, **self.tree_params)
tree.fit(Phi_fit, y_fit)
self.estimators_.append(tree)
self.intervals_.append(intervals)
return self
def predict(self, X) -> np.ndarray:
if self.estimators_ is None or self.intervals_ is None or self.feature_functions_ is None:
raise RuntimeError("Call fit() before predict().")
X3 = _as_3d_panel(X)
n, d, m = X3.shape
if m != self.n_timepoints_ or d != self.n_dims_:
raise ValueError(
f"X must have shape (n,{self.n_dims_},{self.n_timepoints_}) (or (n,{self.n_timepoints_})), got {X3.shape}"
)
preds = np.zeros(n, dtype=float)
for tree, intervals in zip(self.estimators_, self.intervals_):
Phi = _transform_intervals(X3, intervals, self.feature_functions_)
preds += tree.predict(Phi)
return preds / len(self.estimators_)
Demo: multi-seasonal one-step forecasting via sliding windows#
We generate a single time series with two seasonalities (weekly + ~monthly) and correlated noise.
Then we build a supervised dataset:
input: the last \(L\) observations
target: the next observation
This reduction turns forecasting into regression on fixed-length time series windows.
def simulate_ar1_noise(n: int, *, phi: float, sigma: float, rng: np.random.Generator) -> np.ndarray:
eps = rng.normal(0.0, sigma, size=n)
u = np.zeros(n)
for t in range(1, n):
u[t] = phi * u[t - 1] + eps[t]
return u
n = 700
idx = pd.date_range("2022-01-01", periods=n, freq="D")
t = np.arange(n)
weekly = 2.0 * np.sin(2 * np.pi * t / 7) + 0.5 * np.cos(2 * np.pi * t / 7)
monthly = 1.3 * np.sin(2 * np.pi * t / 30) - 0.4 * np.cos(2 * np.pi * t / 30)
trend = 0.02 * t
noise = simulate_ar1_noise(n, phi=0.6, sigma=0.7, rng=rng)
y = 50.0 + trend + weekly + monthly + noise
y = pd.Series(y, index=idx, name="y")
fig = go.Figure()
fig.add_trace(go.Scatter(x=y.index, y=y, name="y", line=dict(color="black")))
fig.update_layout(title="Synthetic multi-seasonal series", xaxis_title="date", yaxis_title="value")
fig.show()
def make_sliding_windows(y: np.ndarray, window_length: int) -> tuple[np.ndarray, np.ndarray]:
y = np.asarray(y, dtype=float)
L = int(window_length)
if L < 2:
raise ValueError("window_length must be >= 2")
if y.size <= L:
raise ValueError("y is too short for the chosen window_length")
X = np.column_stack([y[i : y.size - L + i] for i in range(L)])
y_next = y[L:]
return X, y_next
L = 60
X, y_next = make_sliding_windows(y.to_numpy(), window_length=L)
# time-aware split
h = 120
X_train, y_train = X[:-h], y_next[:-h]
X_test, y_test = X[-h:], y_next[-h:]
t_test = y.index[-h:]
tsf = ComposableTimeSeriesForestRegressor(
["mean", "std", "slope", "min", "max"],
n_estimators=150,
n_intervals="sqrt",
min_interval_length=5,
bootstrap=True,
random_state=7,
tree_params={"max_depth": 12, "min_samples_leaf": 3},
)
tsf.fit(X_train, y_train)
y_pred = tsf.predict(X_test)
mae = mean_absolute_error(y_test, y_pred)
rmse = mean_squared_error(y_test, y_pred, squared=False)
r2 = r2_score(y_test, y_pred)
print(f"MAE: {mae:.3f}")
print(f"RMSE: {rmse:.3f}")
print(f"R^2: {r2:.3f}")
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[5], line 37
34 y_pred = tsf.predict(X_test)
36 mae = mean_absolute_error(y_test, y_pred)
---> 37 rmse = mean_squared_error(y_test, y_pred, squared=False)
38 r2 = r2_score(y_test, y_pred)
40 print(f"MAE: {mae:.3f}")
File ~/miniconda3/lib/python3.12/site-packages/sklearn/utils/_param_validation.py:194, in validate_params.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
191 func_sig = signature(func)
193 # Map *args/**kwargs to the function signature
--> 194 params = func_sig.bind(*args, **kwargs)
195 params.apply_defaults()
197 # ignore self/cls and positional/keyword markers
File ~/miniconda3/lib/python3.12/inspect.py:3277, in Signature.bind(self, *args, **kwargs)
3272 def bind(self, /, *args, **kwargs):
3273 """Get a BoundArguments object, that maps the passed `args`
3274 and `kwargs` to the function's signature. Raises `TypeError`
3275 if the passed arguments can not be bound.
3276 """
-> 3277 return self._bind(args, kwargs)
File ~/miniconda3/lib/python3.12/inspect.py:3266, in Signature._bind(self, args, kwargs, partial)
3256 raise TypeError(
3257 'got some positional-only arguments passed as '
3258 'keyword arguments: {arg!r}'.format(
(...) 3263 ),
3264 )
3265 else:
-> 3266 raise TypeError(
3267 'got an unexpected keyword argument {arg!r}'.format(
3268 arg=next(iter(kwargs))))
3270 return self._bound_arguments_cls(self, arguments)
TypeError: got an unexpected keyword argument 'squared'
# One-step-ahead predictions over the test horizon
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_test, name="actual", line=dict(color="black")))
fig.add_trace(go.Scatter(x=t_test, y=y_pred, name="pred (1-step)", line=dict(color="#4E79A7")))
fig.update_layout(title="TSF one-step-ahead predictions", xaxis_title="date", yaxis_title="value")
fig.show()
fig = go.Figure()
fig.add_trace(go.Scatter(x=y_test, y=y_pred, mode="markers", name="points", marker=dict(opacity=0.7)))
lo = min(float(np.min(y_test)), float(np.min(y_pred)))
hi = max(float(np.max(y_test)), float(np.max(y_pred)))
fig.add_trace(go.Scatter(x=[lo, hi], y=[lo, hi], mode="lines", name="y=x", line=dict(color="black", dash="dash")))
fig.update_layout(title="Actual vs predicted", xaxis_title="actual", yaxis_title="predicted")
fig.show()
def recursive_forecast(model, history: np.ndarray, steps: int, window_length: int) -> np.ndarray:
history = np.asarray(history, dtype=float).tolist()
preds = []
for _ in range(int(steps)):
x = np.asarray(history[-window_length:], dtype=float)[None, :]
y_hat = float(model.predict(x)[0])
preds.append(y_hat)
history.append(y_hat)
return np.asarray(preds)
# Compare recursive multi-step vs ground-truth over a shorter horizon
h2 = 45
start = len(y) - h
hist = y.to_numpy()[:start]
truth = y.to_numpy()[start : start + h2]
t_h2 = y.index[start : start + h2]
rec = recursive_forecast(tsf, history=hist, steps=h2, window_length=L)
fig = go.Figure()
fig.add_trace(go.Scatter(x=y.index[start - 120 : start], y=y.to_numpy()[start - 120 : start], name="history", line=dict(color="rgba(0,0,0,0.35)")))
fig.add_trace(go.Scatter(x=t_h2, y=truth, name="truth", line=dict(color="black")))
fig.add_trace(go.Scatter(x=t_h2, y=rec, name="recursive forecast", line=dict(color="#E15759")))
fig.update_layout(title="Recursive multi-step forecast (reduction)", xaxis_title="date", yaxis_title="value")
fig.show()
# Residual diagnostics (test horizon)
resid = y_test - y_pred
print("residual mean:", float(np.mean(resid)))
print("residual std:", float(np.std(resid, ddof=1)))
print("Jarque-Bera:", stats.jarque_bera(resid))
lags, acf_vals = _acf(resid, max_lag=30)
bound = 1.96 / np.sqrt(resid.size)
# QQ data
nq = resid.size
p = (np.arange(1, nq + 1) - 0.5) / nq
theoretical = stats.norm.ppf(p)
sample_q = np.sort((resid - resid.mean()) / resid.std(ddof=1))
fig = make_subplots(
rows=2,
cols=2,
subplot_titles=("Residuals over time", "Residual histogram", "Residual ACF", "QQ plot (std residuals)"),
)
fig.add_trace(go.Scatter(x=t_test, y=resid, name="residuals", line=dict(color="#4E79A7")), row=1, col=1)
fig.add_hline(y=0, line=dict(color="black", dash="dash"), row=1, col=1)
fig.add_trace(go.Histogram(x=resid, nbinsx=25, name="hist", marker_color="#4E79A7"), row=1, col=2)
fig.add_trace(go.Bar(x=lags, y=acf_vals, name="ACF(resid)", marker_color="#4E79A7"), row=2, col=1)
fig.add_trace(go.Scatter(x=[0, lags.max()], y=[bound, bound], mode="lines", line=dict(color="gray", dash="dash"), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(x=[0, lags.max()], y=[-bound, -bound], mode="lines", line=dict(color="gray", dash="dash"), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(x=theoretical, y=sample_q, mode="markers", name="QQ", marker=dict(color="#4E79A7")), row=2, col=2)
fig.add_trace(
go.Scatter(x=[theoretical.min(), theoretical.max()], y=[theoretical.min(), theoretical.max()], mode="lines", line=dict(color="black", dash="dash"), showlegend=False),
row=2,
col=2,
)
fig.update_layout(height=750, title="Residual diagnostics (test horizon)")
fig.show()
residual mean: 2.022139488149232
residual std: 1.7655684826389506
Jarque-Bera: SignificanceResult(statistic=2.147517772919136, pvalue=0.3417216075363839)
# Visual intuition: show random intervals used by one tree on the last input window
tree_idx = 0
intervals = tsf.intervals_[tree_idx]
window = X_test[-1]
x_axis = np.arange(window.size)
fig = go.Figure()
fig.add_trace(go.Scatter(x=x_axis, y=window, name="window", line=dict(color="black")))
for (s, e) in intervals[:12]:
fig.add_vrect(x0=s, x1=e - 1, fillcolor="rgba(78,121,167,0.10)", line_width=0)
fig.update_layout(
title=f"Example random intervals (tree {tree_idx}) over one input window",
xaxis_title="lag index in window",
yaxis_title="value",
)
fig.show()